#include "config.h"

#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"
#include "volume_proto.h"

#include "lsv_wbuffer_internal.h"

int lsv_wal_malloc_i(lsv_volume_proto_t *volume_proto, lsv_wbuf_segment_t * chunk_block){
        int ret;
        ret = lsv_volume_chunk_malloc_batch(volume_proto, LSV_WBUFFER_STORAGE_TYPE, LSV_WBUF_CHUNK_NUM * 2,
                                            chunk_block->stor_chunk_id);
        if(unlikely(ret)){
                GOTO(EXIT, ret);
        }
EXIT:
        return ret;
}

int lsv_wal_malloc(lsv_volume_proto_t * volume_proto, lsv_wbuf_segment_t * chunk_block){
        int ret ;
        lsv_wbuf_t * wbuf = (lsv_wbuf_t *)volume_proto->wbuf;

        //lsv_wrlock(&wbuf->wb_vol_rwlock);
        ret = lsv_wal_malloc_i(volume_proto, chunk_block);
        //lsv_rwunlock(&wbuf->wb_vol_rwlock);
        if(unlikely(ret)){
                GOTO(EXIT, ret);
        }
EXIT:
        return ret;
}

int lsv_wal_append_io(lsv_volume_proto_t *volume_proto, const io_t *io, const buffer_t *buf) {
        int err = 0;
        lsv_s32_t i = 0;
        lsv_wbuf_segment_t *segment = NULL;
        wbuf_chunk_block_head * wc_bh = NULL;

        lsv_s8_t *tmp_buf = NULL;
        lsv_s8_t *buf_ptr = NULL;
        lsv_u32_t malloc_size = 0;
        lsv_u32_t real_size = 0;

        lsv_s32_t stor_chunk_cur = 0;
        lsv_s32_t page_idx = 0;

        lsv_s32_t chunk_left_size = 0;
        lsv_s32_t left = 0;
        lsv_s32_t step = 0;

        segment = get_current_chunk_block(volume_proto);

        err = get_local_data_io_write_size(volume_proto, io, &malloc_size, &real_size);
        if(unlikely(err)){
                GOTO(EXIT, err);
        }

        err = ymalloc((void **)&tmp_buf, malloc_size);
        if(unlikely(err)){
                err = -ENOMEM;
                GOTO(EXIT, err);
        }
        wc_bh = (wbuf_chunk_block_head *)tmp_buf;
        gettimeofday(&wc_bh->tv, NULL);
        wc_bh->lba= io->offset;
        wc_bh->size = real_size;
        wc_bh->crc = 0;

        mbuffer_get(buf, (lsv_s8_t *) (tmp_buf + LSV_WBUF_PAGE_SIZE) , real_size);

        stor_chunk_cur = segment->stor_chunk_cur;
        page_idx = segment->stor_chunk_page_idx;
        left = malloc_size;
        buf_ptr = tmp_buf;

        while(left){
                chunk_left_size = (LSV_WBUF_PAGE_NUM - page_idx) * LSV_WBUF_PAGE_SIZE;
                if(chunk_left_size < left){
                        step = chunk_left_size;
                        left = malloc_size - chunk_left_size;
                }else{
                        step = left;
                        left = 0;
                }

                DINFO("wal: lsn %llu off %u size %u\n", (LLU)io->lsn, page_idx * LSV_PAGE_SIZE, step);

                lsv_volume_io_t vio;
                lsv_volume_io_init(&vio,
                                   segment->stor_chunk_id[stor_chunk_cur],
                                   page_idx * LSV_PAGE_SIZE,
                                   step,
                                   LSV_WBUFFER_STORAGE_TYPE);
                err = lsv_volume_chunk_update(volume_proto, &vio, (lsv_s8_t *) buf_ptr);
                if (err) {
                        DERROR("write chunk_buf[%d] into disk failed, err: %d\n", stor_chunk_cur, err);
                        err = -1;
                        goto EXIT;
                }

                volume_proto->wbuf_write_count++;

                buf_ptr += step;

                if(chunk_left_size == step){
                        page_idx = 0;
                        segment->stor_chunk_page_idx = 0;

                        stor_chunk_cur ++;
                        segment->stor_chunk_cur++;
                }else{
                        page_idx += (step / LSV_WBUF_PAGE_SIZE);
                        segment->stor_chunk_page_idx = page_idx;
                }
        }

        yfree((void **)&tmp_buf);
        return real_size;

        EXIT:
        yfree((void **)&tmp_buf);
        return err;
}

int lsv_wal_append_end(lsv_volume_proto_t *volume_proto) {
        int err = 0;
        lsv_wbuf_segment_t *segment = NULL;
        lsv_s8_t *tmp_buf = NULL;
        lsv_s32_t stor_chunk_cur = 0;
        lsv_s32_t page_idx = 0;

        wbuf_chunk_block_head * wc_bh = NULL;

        segment = get_current_chunk_block(volume_proto);

        stor_chunk_cur = segment->stor_chunk_cur;
        page_idx = segment->stor_chunk_page_idx;

        err = ymalloc((void **)&tmp_buf, LSV_WBUF_PAGE_SIZE);
        if(unlikely(err)){
                err = -ENOMEM;
                GOTO(EXIT, err);
        }
        wc_bh = (wbuf_chunk_block_head *)tmp_buf;
        gettimeofday(&wc_bh->tv, NULL);
        wc_bh->tv.tv_sec = LSV_WBUF_MAX_TIMESTAMP;
        wc_bh->tv.tv_usec = LSV_WBUF_MAX_TIMESTAMP;
        wc_bh->lba = 0;
        wc_bh->size = 0;
        wc_bh->crc = 0;

        lsv_volume_io_t vio;
        lsv_volume_io_init(&vio,
                           segment->stor_chunk_id[stor_chunk_cur],
                           page_idx * LSV_PAGE_SIZE,
                           LSV_WBUF_PAGE_SIZE,
                           LSV_WBUFFER_STORAGE_TYPE);
        err = lsv_volume_chunk_update(volume_proto, &vio, (lsv_s8_t *) tmp_buf);
        if (err) {
                DERROR("write chunk_buf[%d] into disk failed, err: %d\n", stor_chunk_cur, err);
                err = -1;
                goto EXIT;
        }

        volume_proto->wbuf_write_end_count++;
        EXIT:
        return err;
}

int lsv_wal_load_chunk_id(lsv_volume_proto_t *volume_proto, lsv_u32_t *chunk_ids){
	int err = 0;
	lsv_io_t io;
	
	lsv_io_init(&io, 
			&volume_proto->volume_proto->chkid, 
			LSV_PRIM_CHUNK_ID, 
			volume_proto->u.wbuf_page_id * LSV_PAGE_SIZE, 
			LSV_WBUF_TOTAL_CHUNK_NUM * sizeof(lsv_u32_t), 
			0);
	err = lsv_volume_chunk_read_data_i(volume_proto, &io, (lsv_s8_t *)chunk_ids);	
	if(unlikely(err)){
		DERROR("get chunk_id failed\n");
		err = -1;
		return err;
	}

	return err;
}

int lsv_wal_read_data(lsv_volume_proto_t *volume_proto, lsv_s8_t * buf, lsv_u32_t chunk_id, lsv_s32_t page_idx, lsv_s32_t size) {
	int err = 0;
	lsv_io_t io;

	lsv_io_init(&io, 
		&volume_proto->volume_proto->chkid, 
		chunk_id, 
		LSV_WBUF_PAGE_SIZE * page_idx, 
		size, 
		0);
	err = lsv_volume_chunk_read_data_i(volume_proto, &io, (lsv_s8_t *)buf);
	if(unlikely(err)){
		DERROR("chunk[%d] load failed, err: %d\n", chunk_id, err);
		err = -1;
		return err;
	}
        return err;
}

int lsv_wal_commit_io(lsv_volume_proto_t *volume_proto, lsv_s8_t * buf, lsv_u64_t lba, lsv_s32_t size) {
	int err = 0;
	lsv_io_t io;
	buffer_t buffer_new;

	io_init(&io, NULL, NULL, lba, size, 0);
	mbuffer_init(&buffer_new, 0);
	mbuffer_copy(&buffer_new, (const char *)buf, size);
		
	err = volume_proto_mem_append_io(volume_proto, &io, &buffer_new, 0);
	if(unlikely(err)){
		DERROR("commit wbuf write failed\n");
		err = -1;
		return err;
	}
        return err;
}



int lsv_wal_load_redo(lsv_volume_proto_t *volume_proto, lsv_u32_t *chunk_ids) {
	int err = 0;
	lsv_s32_t i = 0;
	lsv_s32_t j = 0;
	lsv_s32_t chunk_block_num = 0;
	lsv_io_t io;
	lsv_s32_t page_idx = 0;
	wbuf_chunk_block_head * wc_bh = NULL;
	lsv_s8_t page_buf[LSV_WBUF_PAGE_SIZE];


	//obtain valid chunk_block num
	for(i = 0; i < LSV_WBUF_TOTAL_CHUNK_NUM; i += LSV_WBUF_CHUNK_NUM * 2){
		if(0 == chunk_ids[i] || 1 == (chunk_ids[i] & LSV_WBUF_READY_FLAG)){
			DINFO("chunk_block num is %d, load complete!\n", chunk_block_num);
			break;
		}

		page_idx = 0;
		for(j = 0; j < LSV_WBUF_CHUNK_NUM * 2; ){
			if(0 == chunk_ids[i + j]){
				DINFO("chunk_block[%d-%d]'s %d chunks load complete....\n", LSV_WBUF_TOTAL_CHUNK_NUM, i, j);
				break;
			}

			memset(page_buf, 0, LSV_WBUF_PAGE_SIZE);
			err = lsv_wal_read_data(volume_proto, page_buf, chunk_ids[i + j], page_idx, LSV_WBUF_PAGE_SIZE);
			if(unlikely(err)){
				DERROR("chunk[%d] %d load failed, err: %d\n", i + j, chunk_ids[i + j], err);
				err = -1;
				return err;
			}

			page_idx ++;
			/*judge whether is chunk_block's end*/
			wc_bh = (wbuf_chunk_block_head *)page_buf;
			if(wc_bh->lba == 0 && 
				wc_bh->size == 0 && 
				wc_bh->crc == 0 && 
				wc_bh->tv.tv_sec == LSV_WBUF_MAX_TIMESTAMP && 
				wc_bh->tv.tv_usec == LSV_WBUF_MAX_TIMESTAMP){
				break;
			}
			
			/*malloc io buf*/
			lsv_s8_t * wbuf_buf = NULL;
			err = ymalloc((void **)&wbuf_buf, wc_bh->size);
			if(unlikely(err)){
				DERROR("malloc for wbuf_buf size : %d failed\n", wc_bh->size);
				err = -ENOMEM;
				return err;
			}
			memset(wbuf_buf, 0, wc_bh->size);

			lsv_s32_t left = wc_bh->size;
			/*read io data*/
			while(left){
				
				lsv_s32_t write_size = 0;
				if(wc_bh->size < (LSV_WBUF_PAGE_NUM - page_idx) * LSV_WBUF_PAGE_SIZE){
					write_size = left;
				}else{
					write_size = LSV_WBUF_PAGE_SIZE *(LSV_WBUF_PAGE_NUM - page_idx);
				}

				err = lsv_wal_read_data(volume_proto, wbuf_buf + wc_bh->size - left, chunk_ids[i + j], page_idx, write_size);
				if(unlikely(err)){
					DERROR("chunk[%d] %d load failed, err: %d\n", i + j, chunk_ids[i + j], err);
					err = -1;
					return err;
				}

				left -= write_size;
				page_idx += write_size/LSV_WBUF_PAGE_SIZE;
				if(page_idx == LSV_WBUF_PAGE_NUM){
					page_idx = 0;
					j++;
				}
			}

			err = lsv_wal_commit_io(volume_proto, wbuf_buf, wc_bh->lba, wc_bh->size);
			if(unlikely(err)){
				DERROR("commit wbuf write failed\n");
				err = -1;
				return err;
			}
			yfree((void **)&wbuf_buf);
		}
	}
	//process every chunk_block's redo
        return err;
}

